defmodule Kernel.ParallelCompiler do
  @moduledoc """
  A module responsible for compiling and requiring files in parallel.
  """

  @doc """
  Starts a task for parallel compilation.

  If you have a file that needs to compile other modules in parallel,
  the spawned processes need to be aware of the compiler environment.
  This function allows a developer to create a task that is aware of
  those environments.

  See `Task.async/1` for more information. The task spawned must be
  always awaited on by calling `Task.await/1`
  """
  @doc since: "1.6.0"
  def async(fun) when is_function(fun, 0) do
    case :erlang.get(:elixir_compiler_info) do
      {compiler, _} ->
        file = :erlang.get(:elixir_compiler_file)
        dest = :erlang.get(:elixir_compiler_dest)

        {:error_handler, error_handler} = :erlang.process_info(self(), :error_handler)
        checker = Module.ParallelChecker.get()

        Task.async(fn ->
          send(compiler, {:async, self()})
          Module.ParallelChecker.put(compiler, checker)
          :erlang.put(:elixir_compiler_info, {compiler, self()})
          :erlang.put(:elixir_compiler_file, file)
          dest != :undefined and :erlang.put(:elixir_compiler_dest, dest)
          :erlang.process_flag(:error_handler, error_handler)
          fun.()
        end)

      :undefined ->
        raise ArgumentError,
              "cannot spawn parallel compiler task because " <>
                "the current file is not being compiled/required"
    end
  end

  @doc """
  Compiles the given files.

  Those files are compiled in parallel and can automatically
  detect dependencies between them. Once a dependency is found,
  the current file stops being compiled until the dependency is
  resolved.

  It returns `{:ok, modules, warnings}` or `{:error, errors, warnings}`.

  Both errors and warnings are a list of three-element tuples containing
  the file, line and the formatted error/warning.

  ## Options

    * `:each_file` - for each file compiled, invokes the callback passing the
      file

    * `:each_long_compilation` - for each file that takes more than a given
      timeout (see the `:long_compilation_threshold` option) to compile, invoke
      this callback passing the file as its argument

    * `:each_module` - for each module compiled, invokes the callback passing
      the file, module and the module bytecode

    * `:each_cycle` - after the given files are compiled, invokes this function
      that should return the following values:
      * `{:compile, modules, warnings}` - to continue compilation with a list of
        further modules to compile
      * `{:runtime, modules, warnings}` - to stop compilation and verify the list
        of modules because dependent modules have changed

    * `:long_compilation_threshold` - the timeout (in seconds) to check for modules
      taking too long to compile. For each file that exceeds the threshold, the
      `:each_long_compilation` callback is invoked. From Elixir v1.11, only the time
      spent compiling the actual module is taken into account by the threshold, the
      time spent waiting is not considered. Defaults to `10` seconds.

    * `:profile` - if set to `:time` measure the compilation time of each compilation cycle
       and group pass checker

    * `:dest` - the destination directory for the BEAM files. When using `compile/2`,
      this information is only used to properly annotate the BEAM files before
      they are loaded into memory. If you want a file to actually be written to
      `dest`, use `compile_to_path/3` instead.

    * `:beam_timestamp` - the modification timestamp to give all BEAM files

  """
  @doc since: "1.6.0"
  def compile(files, options \\ []) when is_list(options) do
    spawn_workers(files, :compile, options)
  end

  @doc """
  Compiles the given files and writes resulting BEAM files into path.

  See `compile/2` for more information.
  """
  @doc since: "1.6.0"
  def compile_to_path(files, path, options \\ []) when is_binary(path) and is_list(options) do
    spawn_workers(files, {:compile, path}, options)
  end

  @doc """
  Requires the given files in parallel.

  Opposite to compile, dependencies are not attempted to be
  automatically solved between files.

  It returns `{:ok, modules, warnings}` or `{:error, errors, warnings}`.

  Both errors and warnings are a list of three-element tuples containing
  the file, line and the formatted error/warning.

  ## Options

    * `:each_file` - for each file compiled, invokes the callback passing the
      file

    * `:each_module` - for each module compiled, invokes the callback passing
      the file, module and the module bytecode

  """
  @doc since: "1.6.0"
  def require(files, options \\ []) when is_list(options) do
    spawn_workers(files, :require, options)
  end

  @doc false
  @deprecated "Use Kernel.ParallelCompiler.compile/2 instead"
  def files(files, options \\ []) when is_list(options) do
    case spawn_workers(files, :compile, options) do
      {:ok, modules, _} -> modules
      {:error, _, _} -> exit({:shutdown, 1})
    end
  end

  @doc false
  @deprecated "Use Kernel.ParallelCompiler.compile_to_path/2 instead"
  def files_to_path(files, path, options \\ []) when is_binary(path) and is_list(options) do
    case spawn_workers(files, {:compile, path}, options) do
      {:ok, modules, _} -> modules
      {:error, _, _} -> exit({:shutdown, 1})
    end
  end

  defp spawn_workers(files, output, options) do
    {:module, _} = :code.ensure_loaded(Kernel.ErrorHandler)
    schedulers = max(:erlang.system_info(:schedulers_online), 2)
    {:ok, checker} = Module.ParallelChecker.start_link(schedulers)

    try do
      outcome = spawn_workers(schedulers, checker, files, output, options)
      {outcome, Code.get_compiler_option(:warnings_as_errors)}
    else
      {{:ok, _, [_ | _] = warnings}, true} ->
        message = "Compilation failed due to warnings while using the --warnings-as-errors option"
        IO.puts(:stderr, message)
        {:error, warnings, []}

      {{:ok, outcome, warnings}, _} ->
        beam_timestamp = Keyword.get(options, :beam_timestamp)
        {:ok, write_module_binaries(outcome, output, beam_timestamp), warnings}

      {{:error, errors, warnings}, true} ->
        {:error, errors ++ warnings, []}

      {{:error, errors, warnings}, _} ->
        {:error, errors, warnings}
    after
      Module.ParallelChecker.stop(checker)
    end
  end

  defp spawn_workers(schedulers, checker, files, output, options) do
    threshold = Keyword.get(options, :long_compilation_threshold, 10) * 1000
    timer_ref = Process.send_after(self(), :threshold_check, threshold)

    {outcome, state} =
      spawn_workers(files, 0, [], [], %{}, [], %{
        dest: Keyword.get(options, :dest),
        each_cycle: Keyword.get(options, :each_cycle, fn -> {:runtime, [], []} end),
        each_file: Keyword.get(options, :each_file, fn _, _ -> :ok end) |> each_file(),
        each_long_compilation: Keyword.get(options, :each_long_compilation, fn _file -> :ok end),
        each_module: Keyword.get(options, :each_module, fn _file, _module, _binary -> :ok end),
        profile: profile_init(Keyword.get(options, :profile)),
        output: output,
        timer_ref: timer_ref,
        long_compilation_threshold: threshold,
        schedulers: schedulers,
        checker: checker
      })

    Process.cancel_timer(state.timer_ref)

    receive do
      :threshold_check -> :ok
    after
      0 -> :ok
    end

    outcome
  end

  defp each_file(fun) when is_function(fun, 1), do: fn file, _ -> fun.(file) end
  defp each_file(fun) when is_function(fun, 2), do: fun

  defp each_file(file, lexical, parent) do
    ref = Process.monitor(parent)
    send(parent, {:file_ok, self(), ref, file, lexical})

    receive do
      ^ref -> :ok
      {:DOWN, ^ref, _, _, _} -> :ok
    end
  end

  defp write_module_binaries(result, {:compile, path}, timestamp) do
    Enum.flat_map(result, fn
      {{:module, module}, {binary, _map}} ->
        full_path = Path.join(path, Atom.to_string(module) <> ".beam")
        File.write!(full_path, binary)
        if timestamp, do: File.touch!(full_path, timestamp)
        [module]

      _ ->
        []
    end)
  end

  defp write_module_binaries(result, _output, _timestamp) do
    for {{:module, module}, _} <- result, do: module
  end

  ## Verification

  defp verify_modules(result, warnings, dependent_modules, state) do
    checker_warnings = maybe_check_modules(result, dependent_modules, state)
    warnings = Enum.reverse(warnings, checker_warnings)
    {{:ok, result, warnings}, state}
  end

  defp maybe_check_modules(result, runtime_modules, state) do
    %{profile: profile, checker: checker} = state

    compiled_modules =
      for {{:module, _module}, {_binary, info}} <- result,
          do: info

    runtime_modules =
      for module <- runtime_modules,
          path = :code.which(module),
          is_list(path) and path != [],
          do: {module, path}

    profile_checker(profile, compiled_modules, runtime_modules, fn ->
      Module.ParallelChecker.verify(checker, compiled_modules, runtime_modules)
    end)
  end

  defp profile_init(:time), do: {:time, System.monotonic_time(), 0}
  defp profile_init(nil), do: :none

  defp profile_checker({:time, _, _}, compiled_modules, runtime_modules, fun) do
    {time, result} = :timer.tc(fun)
    time = div(time, 1000)
    num_modules = length(compiled_modules) + length(runtime_modules)
    IO.puts(:stderr, "[profile] Finished group pass check of #{num_modules} modules in #{time}ms")
    result
  end

  defp profile_checker(:none, _compiled_modules, _runtime_modules, fun) do
    fun.()
  end

  ## Compiler worker spawning

  # We already have n=schedulers currently running, don't spawn new ones
  defp spawn_workers(
         queue,
         spawned,
         waiting,
         files,
         result,
         warnings,
         %{schedulers: schedulers} = state
       )
       when spawned - length(waiting) >= schedulers do
    wait_for_messages(queue, spawned, waiting, files, result, warnings, state)
  end

  # Release waiting processes
  defp spawn_workers([{ref, found} | t], spawned, waiting, files, result, warnings, state) do
    {files, waiting} =
      case List.keytake(waiting, ref, 2) do
        {{_kind, pid, ^ref, file_pid, _on, _defining, _deadlock}, waiting} ->
          send(pid, {ref, found})
          {update_timing(files, file_pid, :waiting), waiting}

        nil ->
          # In case the waiting process died (for example, it was an async process),
          # it will no longer be on the list. So we need to take it into account here.
          {files, waiting}
      end

    spawn_workers(t, spawned, waiting, files, result, warnings, state)
  end

  defp spawn_workers([file | queue], spawned, waiting, files, result, warnings, state) do
    %{output: output, dest: dest, checker: checker} = state
    parent = self()
    file = Path.expand(file)

    {pid, ref} =
      :erlang.spawn_monitor(fn ->
        Module.ParallelChecker.put(parent, checker)
        :erlang.put(:elixir_compiler_info, {parent, self()})
        :erlang.put(:elixir_compiler_file, file)

        try do
          case output do
            {:compile, path} -> compile_file(file, path, parent)
            :compile -> compile_file(file, dest, parent)
            :require -> require_file(file, parent)
          end
        catch
          kind, reason ->
            send(parent, {:file_error, self(), file, {kind, reason, __STACKTRACE__}})
        end

        exit(:shutdown)
      end)

    file_data = %{
      pid: pid,
      ref: ref,
      file: file,
      timestamp: System.monotonic_time(),
      compiling: 0,
      waiting: 0,
      warned: false
    }

    files = [file_data | files]
    spawn_workers(queue, spawned + 1, waiting, files, result, warnings, state)
  end

  # No more queue, nothing waiting, this cycle is done
  defp spawn_workers([], 0, [], [], result, warnings, state) do
    cycle_return = each_cycle_return(state.each_cycle.())
    state = cycle_timing(result, state)

    case cycle_return do
      {:runtime, dependent_modules, extra_warnings} ->
        verify_modules(result, extra_warnings ++ warnings, dependent_modules, state)

      {:compile, [], extra_warnings} ->
        verify_modules(result, extra_warnings ++ warnings, [], state)

      {:compile, more, extra_warnings} ->
        spawn_workers(more, 0, [], [], result, extra_warnings ++ warnings, state)
    end
  end

  # files x, waiting for x: POSSIBLE ERROR! Release processes so we get the failures

  # Single entry, just release it.
  defp spawn_workers(
         [],
         1,
         [{_, pid, ref, _, _, _, _}] = waiting,
         [%{pid: pid}] = files,
         result,
         warnings,
         state
       ) do
    spawn_workers([{ref, :not_found}], 1, waiting, files, result, warnings, state)
  end

  # Multiple entries, try to release modules.
  defp spawn_workers([], spawned, waiting, files, result, warnings, state)
       when length(waiting) == spawned do
    # There is potentially a deadlock. We will release modules with
    # the following order:
    #
    #   1. Code.ensure_compiled/1 checks without a known definition (deadlock = soft)
    #   2. Code.ensure_compiled/1 checks with a known definition (deadlock = soft)
    #   3. Struct/import/require/ensure_compiled! checks without a known definition (deadlock = hard)
    #   4. Modules without a known definition
    #   5. Code invocation (deadlock = raise)
    #
    # The reason for step 3 and 4 is to not treat typos as deadlocks and
    # help developers handle those sooner. However, this can have false
    # positives in case multiple modules are defined in the same file
    # and the module we are waiting for is defined later on.
    #
    # Finally, note there is no difference between hard and raise, the
    # difference is where the raise is happening, inside the compiler
    # or in the caller.

    deadlocked =
      deadlocked(waiting, :soft, false) ||
        deadlocked(waiting, :soft, true) || deadlocked(waiting, :hard, false) ||
        without_definition(waiting, files)

    if deadlocked do
      spawn_workers(deadlocked, spawned, waiting, files, result, warnings, state)
    else
      errors = handle_deadlock(waiting, files)
      {{:error, errors, warnings}, state}
    end
  end

  # No more queue, but spawned and length(waiting) do not match
  defp spawn_workers([], spawned, waiting, files, result, warnings, state) do
    wait_for_messages([], spawned, waiting, files, result, warnings, state)
  end

  defp compile_file(file, path, parent) do
    :erlang.process_flag(:error_handler, Kernel.ErrorHandler)
    :erlang.put(:elixir_compiler_dest, path)
    :elixir_compiler.file(file, &each_file(&1, &2, parent))
  end

  defp require_file(file, parent) do
    case :elixir_code_server.call({:acquire, file}) do
      :required ->
        send(parent, {:file_cancel, self()})

      :proceed ->
        :elixir_compiler.file(file, &each_file(&1, &2, parent))
        :elixir_code_server.cast({:required, file})
    end
  end

  defp cycle_timing(_result, %{profile: :none} = state) do
    state
  end

  defp cycle_timing(result, %{profile: {:time, cycle_start, module_counter}} = state) do
    num_modules = count_modules(result)
    diff_modules = num_modules - module_counter
    now = System.monotonic_time()
    time = System.convert_time_unit(now - cycle_start, :native, :millisecond)

    IO.puts(
      :stderr,
      "[profile] Finished compilation cycle of #{diff_modules} modules in #{time}ms"
    )

    %{state | profile: {:time, now, num_modules}}
  end

  defp count_modules(result) do
    Enum.count(result, &match?({{:module, _}, _}, &1))
  end

  # TODO: Deprecate other returns on v1.14
  defp each_cycle_return({kind, modules, warnings}), do: {kind, modules, warnings}
  defp each_cycle_return({kind, modules}), do: {kind, modules, []}
  defp each_cycle_return(modules) when is_list(modules), do: {:compile, modules, []}

  # The goal of this function is to find leaves in the dependency graph,
  # i.e. to find code that depends on code that we know is not being defined.
  # Note that not all files have been compiled yet, so they may not be in waiting.
  defp without_definition(waiting, files) do
    nillify_empty(
      for %{pid: pid} <- files,
          {_, _, ref, ^pid, on, _, _} <- waiting,
          not defining?(on, waiting),
          do: {ref, :not_found}
    )
  end

  defp deadlocked(waiting, type, defining?) do
    nillify_empty(
      for {_, _, ref, _, on, _, ^type} <- waiting,
          defining?(on, waiting) == defining?,
          do: {ref, :deadlock}
    )
  end

  defp defining?(on, waiting) do
    Enum.any?(waiting, fn {_, _, _, _, _, defining, _} -> on in defining end)
  end

  defp nillify_empty([]), do: nil
  defp nillify_empty([_ | _] = list), do: list

  # Wait for messages from child processes
  defp wait_for_messages(queue, spawned, waiting, files, result, warnings, state) do
    %{output: output} = state

    receive do
      {:async, process} ->
        Process.monitor(process)
        wait_for_messages(queue, spawned + 1, waiting, files, result, warnings, state)

      {:available, kind, module} ->
        available =
          for {^kind, _, ref, _, ^module, _defining, _deadlock} <- waiting,
              do: {ref, :found}

        result = Map.put(result, {kind, module}, true)
        spawn_workers(available ++ queue, spawned, waiting, files, result, warnings, state)

      {:module_available, child, ref, file, module, binary, checker_info} ->
        state.each_module.(file, module, binary)

        # Release the module loader which is waiting for an ack
        send(child, {ref, :ack})

        available =
          for {:module, _, ref, _, ^module, _defining, _deadlock} <- waiting,
              do: {ref, :found}

        result = Map.put(result, {:module, module}, {binary, checker_info})
        spawn_workers(available ++ queue, spawned, waiting, files, result, warnings, state)

      # If we are simply requiring files, we do not add to waiting.
      {:waiting, _kind, child, ref, _file_pid, _on, _defining, _deadlock} when output == :require ->
        send(child, {ref, :not_found})
        spawn_workers(queue, spawned, waiting, files, result, warnings, state)

      {:waiting, kind, child_pid, ref, file_pid, on, defining, deadlock?} ->
        # If we already got what we were waiting for, do not put it on waiting.
        # If we're waiting on ourselves, send :found so that we can crash with
        # a better error.
        {files, waiting} =
          if Map.has_key?(result, {kind, on}) or on in defining do
            send(child_pid, {ref, :found})
            {files, waiting}
          else
            files = update_timing(files, file_pid, :compiling)
            {files, [{kind, child_pid, ref, file_pid, on, defining, deadlock?} | waiting]}
          end

        spawn_workers(queue, spawned, waiting, files, result, warnings, state)

      :threshold_check ->
        files =
          for data <- files do
            if data.warned or List.keymember?(waiting, data.pid, 1) do
              data
            else
              data = update_timing(data, :compiling)
              data = maybe_warn_long_compilation(data, state)
              data
            end
          end

        timer_ref = Process.send_after(self(), :threshold_check, state.long_compilation_threshold)
        state = %{state | timer_ref: timer_ref}
        spawn_workers(queue, spawned, waiting, files, result, warnings, state)

      {:warning, file, line, message} ->
        file = file && Path.absname(file)
        message = :unicode.characters_to_binary(message)
        warning = {file, line, message}
        wait_for_messages(queue, spawned, waiting, files, result, [warning | warnings], state)

      {:file_ok, child_pid, ref, file, lexical} ->
        state.each_file.(file, lexical)
        send(child_pid, ref)

        discard_down(child_pid)
        new_files = discard_and_maybe_log_file(files, child_pid, state)

        # Sometimes we may have spurious entries in the waiting list
        # because someone invoked try/rescue UndefinedFunctionError
        new_waiting = List.keydelete(waiting, child_pid, 1)
        spawn_workers(queue, spawned - 1, new_waiting, new_files, result, warnings, state)

      {:file_cancel, child_pid} ->
        discard_down(child_pid)
        new_files = Enum.reject(files, &(&1.pid == child_pid))
        spawn_workers(queue, spawned - 1, waiting, new_files, result, warnings, state)

      {:file_error, child_pid, file, {kind, reason, stack}} ->
        print_error(file, kind, reason, stack)
        discard_down(child_pid)
        files |> Enum.reject(&(&1.pid == child_pid)) |> terminate()
        {{:error, [to_error(file, kind, reason, stack)], warnings}, state}

      {:DOWN, ref, :process, pid, reason} ->
        waiting = List.keydelete(waiting, pid, 1)

        case handle_down(files, ref, reason) do
          :ok -> wait_for_messages(queue, spawned - 1, waiting, files, result, warnings, state)
          {:error, errors} -> {{:error, errors, warnings}, state}
        end
    end
  end

  defp update_timing(files, pid, key) do
    Enum.map(files, fn data ->
      if data.pid == pid do
        time = System.monotonic_time()
        %{data | key => data[key] + time - data.timestamp, timestamp: time}
      else
        data
      end
    end)
  end

  defp update_timing(data, key) do
    time = System.monotonic_time()
    %{data | key => data[key] + time - data.timestamp, timestamp: time}
  end

  defp maybe_warn_long_compilation(data, state) do
    compiling = System.convert_time_unit(data.compiling, :native, :millisecond)

    if not data.warned and compiling >= state.long_compilation_threshold do
      state.each_long_compilation.(data.file)
      %{data | warned: true}
    else
      data
    end
  end

  defp discard_and_maybe_log_file(files, pid, state) do
    Enum.reject(files, fn data ->
      if data.pid == pid do
        data = update_timing(data, :compiling)
        data = maybe_warn_long_compilation(data, state)

        if state.profile != :none do
          compiling = to_padded_ms(data.compiling)
          waiting = to_padded_ms(data.waiting)
          relative = Path.relative_to_cwd(data.file)

          IO.puts(
            :stderr,
            "[profile] #{compiling}ms compiling + #{waiting}ms waiting for #{relative}"
          )
        end

        true
      else
        false
      end
    end)
  end

  defp to_padded_ms(time) do
    time
    |> System.convert_time_unit(:native, :millisecond)
    |> Integer.to_string()
    |> String.pad_leading(6, " ")
  end

  defp discard_down(pid) do
    receive do
      {:DOWN, _, :process, ^pid, _} -> :ok
    end
  end

  defp handle_down(_files, _ref, :normal) do
    :ok
  end

  defp handle_down(files, ref, reason) do
    case Enum.find(files, &(&1.ref == ref)) do
      %{pid: pid, file: file} ->
        print_error(file, :exit, reason, [])
        files |> Enum.reject(&(&1.pid == pid)) |> terminate()
        {:error, [to_error(file, :exit, reason, [])]}

      nil ->
        :ok
    end
  end

  defp handle_deadlock(waiting, files) do
    deadlock =
      for %{pid: pid, file: file} <- files do
        {:current_stacktrace, stacktrace} = Process.info(pid, :current_stacktrace)
        Process.exit(pid, :kill)

        {kind, ^pid, _, _, on, _, _} = List.keyfind(waiting, pid, 1)
        description = "deadlocked waiting on #{kind} #{inspect(on)}"
        error = CompileError.exception(description: description, file: nil, line: nil)
        print_error(file, :error, error, stacktrace)
        {Path.relative_to_cwd(file), on, description}
      end

    IO.puts("""

    Compilation failed because of a deadlock between files.
    The following files depended on the following modules:
    """)

    max =
      deadlock
      |> Enum.map(&(&1 |> elem(0) |> String.length()))
      |> Enum.max()

    for {file, mod, _} <- deadlock do
      IO.puts(["  ", String.pad_leading(file, max), " => " | inspect(mod)])
    end

    IO.puts(
      "\nEnsure there are no compile-time dependencies between those files " <>
        "and that the modules they reference exist and are correctly named\n"
    )

    for {file, _, description} <- deadlock, do: {Path.absname(file), nil, description}
  end

  defp terminate(files) do
    for %{pid: pid} <- files, do: Process.exit(pid, :kill)
    for %{pid: pid} <- files, do: discard_down(pid)
    :ok
  end

  defp print_error(file, kind, reason, stack) do
    IO.write([
      "\n== Compilation error in file #{Path.relative_to_cwd(file)} ==\n",
      Kernel.CLI.format_error(kind, reason, stack)
    ])
  end

  defp to_error(file, kind, reason, stack) do
    line = get_line(file, reason, stack)
    file = Path.absname(file)
    message = :unicode.characters_to_binary(Kernel.CLI.format_error(kind, reason, stack))
    {file, line, message}
  end

  defp get_line(_file, %{line: line}, _stack) when is_integer(line) and line > 0 do
    line
  end

  defp get_line(file, :undef, [{_, _, _, []}, {_, _, _, info} | _]) do
    if Keyword.get(info, :file) == to_charlist(Path.relative_to_cwd(file)) do
      Keyword.get(info, :line)
    end
  end

  defp get_line(file, _reason, [{_, _, _, [file: 'expanding macro']}, {_, _, _, info} | _]) do
    if Keyword.get(info, :file) == to_charlist(Path.relative_to_cwd(file)) do
      Keyword.get(info, :line)
    end
  end

  defp get_line(file, _reason, [{_, _, _, info} | _]) do
    if Keyword.get(info, :file) == to_charlist(Path.relative_to_cwd(file)) do
      Keyword.get(info, :line)
    end
  end

  defp get_line(_, _, _) do
    nil
  end
end
